In [1]:
# create entry points to spark
try:
sc.stop()
except:
pass
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc=SparkContext()
spark = SparkSession(sparkContext=sc)
The pyspark.sql.functions.udf()
function is a very important function. It allows us to transfer a user defined function to a pyspark.sql.functions
function which can act on columns of a DataFrame. It makes data framsformation much more flexible.
Using udf()
could be tricky. The key is to understand how to define the returnType
parameter.
In [2]:
from pyspark.sql.types import *
from pyspark.sql.functions import udf
In [3]:
mtcars = spark.read.csv('../../data/mtcars.csv', inferSchema=True, header=True)
mtcars = mtcars.withColumnRenamed('_c0', 'model')
mtcars.show(5)
The structure of the schema passed to returnType
has to match the data structure of the return value from the user defined function.
Case 1: divide disp by hp and put the result to a new column
The user defined function returns a float value.
In [4]:
def disp_by_hp(disp, hp):
return(disp/hp)
In [5]:
disp_by_hp_udf = udf(disp_by_hp, returnType=FloatType())
In [8]:
all_original_cols = [eval('mtcars.' + x) for x in mtcars.columns]
all_original_cols
Out[8]:
In [9]:
disp_by_hp_col = disp_by_hp_udf(mtcars.disp, mtcars.hp)
disp_by_hp_col
Out[9]:
In [11]:
all_new_cols = all_original_cols + [disp_by_hp_col]
all_new_cols
Out[11]:
In [12]:
mtcars.select(all_new_cols).show()
case 2: create an array column that contain disp and hp values
In [34]:
# define function
def merge_two_columns(col1, col2):
return([float(col1), float(col2)])
# convert user defined function into an udf function (sql function)
array_merge_two_columns_udf = udf(merge_two_columns, returnType=ArrayType(FloatType()))
In [35]:
array_col = array_merge_two_columns_udf(mtcars.disp, mtcars.hp)
array_col
Out[35]:
In [36]:
all_new_cols = all_original_cols + [array_col]
all_new_cols
Out[36]:
In [37]:
mtcars.select(all_new_cols).show(5, truncate=False)
Both ArrayType
and StructType
can be used to build returnType
for a list. The difference is:
ArrayType
requires all elements in the list have the same elementType
, while StructType
can have different elementTypes
.StructType
represents a Row
object.Define an ArrayType
with elementType being FloatType
.
In [38]:
# define function
def merge_two_columns(col1, col2):
return([float(col1), float(col2)])
array_type = ArrayType(FloatType())
array_merge_two_columns_udf = udf(merge_two_columns, returnType=array_type)
Define a StructType
with one elementType being StringType
and the other being FloatType
.
In [50]:
# define function
def merge_two_columns(col1, col2):
return([str(col1), float(col2)])
struct_type = StructType([
StructField('f1', StringType()),
StructField('f2', FloatType())
])
struct_merge_two_columns_udf = udf(merge_two_columns, returnType=struct_type)
array column expression: both values are float type values
In [52]:
array_col = array_merge_two_columns_udf(mtcars.hp, mtcars.disp)
array_col
Out[52]:
struct column expression: first value is a string and the second value is a float type value.
In [53]:
struct_col = struct_merge_two_columns_udf(mtcars.model, mtcars.disp)
struct_col
Out[53]:
Results
In [56]:
mtcars.select(array_col, struct_col).show(truncate=False)